// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include <thread>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "pure_mem/std_out_logger.h"
#include "rocksdb/perf_context.h"
#include "util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE)
#include "util/sync_point.h"
#endif

namespace rocksdb {

class PureMemBasicTest : public DBTestBase {
 public:
  PureMemBasicTest() : DBTestBase("/db_basic_test") {
    last_options_.info_log.reset(new StdOutLogger());
    FlushL0CacheTryReopen();
  }

  Status FlushL0CacheTryReopen() {
    Close();
    return DB::Open(last_options_, dbname_, &db_);
  }

  static Slice genMVCCKey1(const char* key) {
    uint32_t keySize = strlen(key) + 1;
    char* ret = new char[keySize];
    memset(ret, '\0', keySize);
    memcpy(ret, key, strlen(key));
    return Slice(ret, keySize);
  }

  static void WriteDataTest(class PureMemBasicTest *threadarg);
  static void SWriteDataTest(class PureMemBasicTest *threadarg);

};

std::mutex mu;
void PureMemBasicTest::WriteDataTest(class PureMemBasicTest *threadarg)
{
  // Jacob: 对传入的参数进行强制类型转换，由无类型指针变为整形数指针，然后再读取
  static int threads_id =0;
  mu.lock();
  ++threads_id;
  int thread_id =threads_id;
  mu.unlock();
  PureMemBasicTest *w_data;
  w_data = threadarg;
  for (int k = 0; k < 100; ++k) {
    int key = thread_id * 10000 + k * 10;
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key).data()), std::string(2, 'v')));
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key+1).data()), "v1"));
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key+2).data()), "v2"));
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key+3).data()), "v3"));
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key+4).data()), "v4"));
  };
}

void PureMemBasicTest::SWriteDataTest(class PureMemBasicTest *threadarg)
{
  // Jacob: 对传入的参数进行强制类型转换，由无类型指针变为整形数指针，然后再读取
  static int threads_id =0;
  mu.lock();
  ++threads_id;
  int thread_id =threads_id;
  mu.unlock();
  PureMemBasicTest *w_data;
  w_data = threadarg;
  for (int k = 0; k < 100; ++k) {
    int key = thread_id * 10000 + k * 10;
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key).data()), std::string(2, 'v')));
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key+1).data()), "v1"));
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key+2).data()), "v2"));
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key+3).data()), "v3"));
    ASSERT_OK(w_data->Put(genMVCCKey1(ToString(key+4).data()), "v4"));
  };
}


TEST_F(PureMemBasicTest, MultiWritePurememTest){
  Options options = CurrentOptions();
  options.pureMemTable = true;
  Reopen(options);
  std::thread threads[10];
  for (auto & i : threads)
    i = std::thread(WriteDataTest, this);
  for (auto &thread : threads)
    thread.join();

  for(int i = 1;i<=10;i++){
    for(int j=0;j<100;j++){
      int key = i * 10000 + j *10;
      ASSERT_EQ(Get(genMVCCKey1(ToString(key).data()).ToString()),"vv");
      ASSERT_EQ(Get(genMVCCKey1(ToString(key+1).data()).ToString()),"v1");
      ASSERT_EQ(Get(genMVCCKey1(ToString(key+2).data()).ToString()),"v2");
      ASSERT_EQ(Get(genMVCCKey1(ToString(key+3).data()).ToString()),"v3");
      ASSERT_EQ(Get(genMVCCKey1(ToString(key+4).data()).ToString()),"v4");
    }
  }
}

TEST_F(PureMemBasicTest, SimpleTest){
  Options options = CurrentOptions();
  options.pureMemTable = true;
  Reopen(options);
  std::thread threads[10];
  for (int id = 0; id < 10; ++id)
    threads[id] = std::thread([&](int lhs){
      for(int i = 0; i < 10000; ++i){
        std::string key = "key" + ToString(lhs * 10000 + i);
        ASSERT_OK(Put(genMVCCKey1(key.data()), "value"));
      }
    }, id);
  for (auto &thread : threads)
    thread.join();

  for(int i = 0; i < 10; ++i){
    for(int j = 0; j < 10000; ++j){
      std::string key = "key" + ToString(i * 10000 + j);
      ASSERT_EQ(Get(genMVCCKey1(key.data()).ToString()),"value");
    }
  }
}

std::string OpenGet(DB* db,const std::string& k) {
  ReadOptions options;

  std::string result;
  Status s = db->Get(options, k, &result);
  if (s.IsNotFound()) {
    result = "NOT_FOUND";
  } else if (!s.ok()) {
    result = s.ToString();
  }
  return result;
}

TEST_F(PureMemBasicTest, PureMemOpenTest) {
  Close();
  Options options = CurrentOptions();
  options.env = env_;
  options.pureMemTable = true;
//  options.encode_version_node = false;
  rocksdb::DB* db2 = nullptr;
  rocksdb::Status s = DB::Open(options, dbname_+"/db_open_test", &db2);
  WriteOptions writeOptions = WriteOptions();
  ASSERT_OK(db2->Put(writeOptions,genMVCCKey1("542"), "v2"));
  ASSERT_OK(db2->Put(writeOptions,genMVCCKey1("444"), "v1"));
  ASSERT_OK(db2->Put(writeOptions,genMVCCKey1("ps"), "v3"));
  ASSERT_OK(db2->Put(writeOptions,genMVCCKey1("ps"), "v4"));

  ASSERT_EQ("v1", OpenGet(db2,genMVCCKey1("444").ToString()));
  ASSERT_EQ("v2", OpenGet(db2,genMVCCKey1("542").ToString()));
  ASSERT_EQ("v4", OpenGet(db2,genMVCCKey1("ps").ToString()));
  db2->Close();
  delete db2;
  s = DB::Open(options, dbname_+"/db_open_test", &db2);
  ASSERT_EQ("v1", OpenGet(db2,genMVCCKey1("444").ToString()));
  ASSERT_EQ("v2", OpenGet(db2,genMVCCKey1("542").ToString()));
  ASSERT_EQ("v4", OpenGet(db2,genMVCCKey1("ps").ToString()));
}

TEST_F(PureMemBasicTest, PureMemHugeDataTest) {
  Options options = CurrentOptions();
  options.pureMemTable = true;
//  options.encode_version_node = false;
  Reopen(options);

  for(int i = 0;i < 300000;i++){
    std::string str_key = "key" + ToString(i);
    const char* key = str_key.data();
    ASSERT_OK(Put(genMVCCKey1(key), std::string("v", 1024)));
  }
  ASSERT_EQ(std::string("v", 1024), Get(genMVCCKey1("key2333").ToString()));
}

TEST_F(PureMemBasicTest, SerialMultiWritePurememTest){
  Options options = CurrentOptions();
  options.pureMemTable = true;
//  options.encode_version_node = false;
  options.allow_concurrent_memtable_write = false;
  Reopen(options);
  std::thread threads[10];
  for (auto & i : threads)
    i = std::thread(SWriteDataTest, this);
  for (auto &thread : threads)
    thread.join();

  for(int i = 1;i<=10;i++){
    for(int j=0;j<100;j++){
      int key = i * 10000 + j *10;
      ASSERT_EQ(Get(genMVCCKey1(ToString(key).data()).ToString()),"vv");
      ASSERT_EQ(Get(genMVCCKey1(ToString(key+1).data()).ToString()),"v1");
      ASSERT_EQ(Get(genMVCCKey1(ToString(key+2).data()).ToString()),"v2");
      ASSERT_EQ(Get(genMVCCKey1(ToString(key+3).data()).ToString()),"v3");
      ASSERT_EQ(Get(genMVCCKey1(ToString(key+4).data()).ToString()),"v4");
    }
  }
}

TEST_F(PureMemBasicTest, SerialPureMemHugeDataTest) {
  Options options = CurrentOptions();
  options.pureMemTable = true;
//  options.encode_version_node = false;
  options.allow_concurrent_memtable_write = false;
  Reopen(options);

  for(int i = 0;i < 300000;i++){
    std::string str_key = "key" + ToString(i);
    const char* key = str_key.data();
    ASSERT_OK(Put(genMVCCKey1(key), std::string("v", 1024)));
  }
  ASSERT_EQ(std::string("v", 1024), Get(genMVCCKey1("key2333").ToString()));
}

bool parseLogFileName(const std::string &filename,
                      uint64_t *number) {
  auto cr_point = find(filename.crbegin(), filename.crend(), '.');
  if (std::string(cr_point.base(), filename.cend()) == "log") {
    auto cr_num = find(cr_point, filename.crend(), '/');
    *number = stoul(std::string(cr_num.base(), cr_point.base() + 2));
    return true;
  }
  return false;
}

TEST_F(PureMemBasicTest, PureMemHugeDataDeleteWALTest) {
  Options options = CurrentOptions();
  options.pureMemTable = true;
//  options.encode_version_node = false;
  Reopen(options);

  for(int i = 0;i < 300000;i++){
    std::string str_key = "key" + ToString(i);
    const char* key = str_key.data();
    ASSERT_OK(Put(genMVCCKey1(key), std::string("v", 1024)));
  }

  std::this_thread::sleep_for(std::chrono::seconds(20));
  std::vector<std::string> all_files;
  std::vector<uint64_t> file_numbers;
  env_->GetChildren(dbname_, &all_files);
  for (const auto& f : all_files) {
    uint64_t number;
    if (parseLogFileName(f, &number)) {
      std::cout << f << std::endl;
      file_numbers.emplace_back(number);
    }
  }
  std::cout << file_numbers.size() << std::endl;
}

} // namespace rocksdb

int main(int argc, char **argv) {
  rocksdb::port::InstallStackTraceHandler();
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}